<?php

namespace app\service\alone\MessageQueue;

use app\service\alone\BaseService;
use app\service\alone\Cache\RedisService;
use app\service\interfaces\MessageQueueInterface;

/**
 * 消息队列结构
 *
 * // 频道端，类型 hash
 * channel => ['[channel]' => "消息位置"]
 *
 * // 消息队列，类型 list
 * message-[channel] => ["", "", "", ""]
 *
 * // 客户端
 * client-[channel] => ["[client]" => 消息位置]
 *
 */
class RedisMessageQueue extends BaseService implements MessageQueueInterface
{
    /** @var $server RedisService */
    private $server;

    protected function init()
    {
        $this->server = RedisService::getInstance();
    }

    /**
     * 订阅频道
     *
     * @param array $channel
     * @param string $client
     */
    public function subscribe(array $channel, string $client): void
    {
        foreach ($channel as $v) {
            // 此频道初始位置
            $position = $this->server->make()->hGet('channel', $v);

            // 初始化该客户端 此频道 起始位置
            $this->server->make()->hSet("client-$v", $client, $position === false ? 0 : $position);
        }
    }

    /**
     * 广播频道
     *
     * @param string $channel
     * @param array $data
     * @return void
     */
    public function publish(string $channel, array $data): void
    {
        // 不存在则设置频道初始位置
        $this->server->make()->hSetNx('channel', $channel, 0);

        // 添加新消息
        $this->server->push([json_encode($data)], "message-$channel");

        // 更新 该频道 位置
        $this->server->make()->hIncrBy('channel', $channel, 1);
    }

    /**
     * 获取频道中的数据
     *
     * @param string $channel
     * @param string $client
     * @param int $num
     * @return array
     * @throws \Exception
     */
    public function data(string $channel, string $client, int $num): array
    {
        // 位置
        $position = [
            // 该频道起始位置
            'channel' => $this->server->make()->hGet('channel', $channel) ?: '0',

            // 该客户端 此频道 起始位置
            'client' => $this->server->make()->hGet("client-$channel", $client),
        ];

        // 客户端未获取
        if (false === $position['client']) throw new \Exception();

        // 无新消息，则返回空
        if ($position['channel'] === $position['client']) return [];

        // 获取队列中的数据
        $list = $this->server->make()->lrange("message-$channel", $position['client'], $position['client'] + $num);

        // 更新此客户端 该频道 消息位置
        $this->server->make()->hIncrBy("client-$channel", $client, count($list));

        return is_array($list) ? $list : [];
    }

    /**
     * 关闭频道
     *
     * @param array $channel
     * @return void
     */
    public function close(array $channel): void
    {
        foreach ($channel as $v) {
            // 删除该频道
            $this->server->make()->hDel('channel', $v);

            // 删除该频道消息队列
            $this->server->clear("message-$v");

            // 删除该客户端频道
            $this->server->clear("client-$v");
        }
    }
}